{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# This notebook is to help automatically import csv file to hive"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current\n",
      "                                 Dload  Upload   Total   Spent    Left  Speed\n",
      "100  861k  100  861k    0     0  7425k      0 --:--:-- --:--:-- --:--:-- 7425k\n",
      "  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current\n",
      "                                 Dload  Upload   Total   Spent    Left  Speed\n",
      "100  861k  100  861k    0     0  7362k      0 --:--:-- --:--:-- --:--:-- 7362k\n",
      "  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current\n",
      "                                 Dload  Upload   Total   Spent    Left  Speed\n",
      "100  861k  100  861k    0     0  7760k      0 --:--:-- --:--:-- --:--:-- 7760k\n"
     ]
    }
   ],
   "source": [
    "%%sh\n",
    "mkdir -p /v3io/${V3IO_HOME}/examples\n",
    "mkdir -p /v3io/${V3IO_HOME}/examples/csvc\n",
    "\n",
    "# Download a sample stocks file from Iguazio demo bucket in S3\n",
    "curl -L \"https://s3.wasabisys.com/iguazio/data/stocks/2018-03-26_BINS_XETR08.csv\" > /v3io/${V3IO_HOME}/examples/demo.csv\n",
    "curl -L \"https://s3.wasabisys.com/iguazio/data/stocks/2018-03-26_BINS_XETR08.csv\" > /v3io/${V3IO_HOME}/examples/csvs/example1.csv\n",
    "curl -L \"https://s3.wasabisys.com/iguazio/data/stocks/2018-03-26_BINS_XETR08.csv\" > /v3io/${V3IO_HOME}/examples/csvs/example2.csv"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Below is import of all needed dependencies. And in this sell you should pass path where parquet files located. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "HOME = os.getenv('V3IO_HOME')\n",
    "V3IO_HOME_URL=os.getenv('V3IO_HOME_URL')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here is creating of spark context with hive support."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "spark = SparkSession.builder.appName(\"Import parquet schema to hive\").config(\"hive.metastore.uris\", \"thrift://hive:9083\").enableHiveSupport().getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define function below for getting sql script needed for creating table in hive using dataframe types as columns to table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def getCreateTableScriptCSV(databaseName, tableName, path, df):\n",
    "    cols = df.dtypes\n",
    "    createScript = \"CREATE EXTERNAL TABLE IF NOT EXISTS \" + databaseName + \".\" + tableName + \"(\"\n",
    "    colArray = []\n",
    "    for colName, colType in cols:\n",
    "        colArray.append(colName.replace(\" \", \"_\") + \" \" + colType)\n",
    "    createColsScript =   \", \".join(colArray )\n",
    "    \n",
    "    script = createScript + createColsScript + \") ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '\" + path + \"' TBLPROPERTIES('skip.header.line.count'='1') \"\n",
    "    print (script)\n",
    "    return script"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "#define main function for creating table where arqument 'path' is path to parquet files \n",
    "def createTableCSV(databaseName, tableName, path): \n",
    "    df = spark.read.format(\"csv\").option(\"header\", \"true\").option(\"inferschema\",\"true\").load(path)\n",
    "    sqlScript = getCreateTableScriptCSV(databaseName, tableName, path, df)\n",
    "    spark.sql(sqlScript)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## One file example"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CREATE EXTERNAL TABLE IF NOT EXISTS default.csv_table(ISIN string, Mnemonic string, SecurityDesc string, SecurityType string, Currency string, SecurityID int, Date timestamp, Time string, StartPrice double, MaxPrice double, MinPrice double, EndPrice double, TradedVolume int, NumberOfTrades int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'v3io://users/admin/examples/demo.csv' TBLPROPERTIES('skip.header.line.count'='1') \n"
     ]
    }
   ],
   "source": [
    "# Set path where the csv file located.\n",
    "my_csv_file_path = os.path.join(V3IO_HOME_URL, 'examples/demo.csv')\n",
    "createTableCSV(\"default\",\"csv_table\",my_csv_file_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## One folder example for spark output job"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CREATE EXTERNAL TABLE IF NOT EXISTS default.table_from_dir2(ISIN string, Mnemonic string, SecurityDesc string, SecurityType string, Currency string, SecurityID string, Date string, Time string, StartPrice string, MaxPrice string, MinPrice string, EndPrice string, TradedVolume string, NumberOfTrades int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'v3io://users/admin/examples/csvs/' TBLPROPERTIES('skip.header.line.count'='1') \n"
     ]
    }
   ],
   "source": [
    "# Set path where parquet folder with csv files inside located.\n",
    "folder_path = os.path.join(V3IO_HOME_URL, 'examples/csvs/')\n",
    "createTableCSV(\"default\",\"table_from_dir2\",folder_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Multiple files and folders example\n",
    "\n",
    "Write here name of database and path to folder where all csv's files (or folders with them) located. Database should be created.\n",
    "In this cell code goes over all files and dirs in provided path and using them for creating table.\n",
    "File should be ended with .csv format and should be \",\" seperated.\n",
    "Directory (in which stored csv files) should be started with \".\"\n",
    "Name of directory or file will be name of table."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CREATE EXTERNAL TABLE IF NOT EXISTS default.demo1(id int, street string, city string, zip int, state string, beds int, baths int, sq__ft int, type string, sale_date string, price int, latitude double, longitude double) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'v3io://users/admin/examples/csvs/demo1.csv' TBLPROPERTIES('skip.header.line.count'='1') \n",
      "CREATE EXTERNAL TABLE IF NOT EXISTS default.demo2(id int, street string, city string, zip int, state string, beds int, baths int, sq__ft int, type string, sale_date string, price int, latitude double, longitude double) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'v3io://users/admin/examples/csvs/demo2.csv' TBLPROPERTIES('skip.header.line.count'='1') \n",
      "CREATE EXTERNAL TABLE IF NOT EXISTS default.example1(ISIN string, Mnemonic string, SecurityDesc string, SecurityType string, Currency string, SecurityID int, Date timestamp, Time string, StartPrice double, MaxPrice double, MinPrice double, EndPrice double, TradedVolume int, NumberOfTrades int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'v3io://users/admin/examples/csvs/example1.csv' TBLPROPERTIES('skip.header.line.count'='1') \n",
      "CREATE EXTERNAL TABLE IF NOT EXISTS default.example2(ISIN string, Mnemonic string, SecurityDesc string, SecurityType string, Currency string, SecurityID int, Date timestamp, Time string, StartPrice double, MaxPrice double, MinPrice double, EndPrice double, TradedVolume int, NumberOfTrades int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'v3io://users/admin/examples/csvs/example2.csv' TBLPROPERTIES('skip.header.line.count'='1') \n"
     ]
    }
   ],
   "source": [
    "databaseName = \"default\"\n",
    "filepath = os.path.join('/v3io', HOME, 'examples/csvs')\n",
    "\n",
    "for fileOrDir in os.listdir(filepath):\n",
    "    if fileOrDir.endswith(\".csv\") :\n",
    "        createTableCSV(databaseName, fileOrDir.split(\".csv\")[0], filepath.replace(\"/v3io/\", \"v3io://\", 1) + \"/\" + fileOrDir)\n",
    "    elif not fileOrDir.startswith(\".\") :\n",
    "        createTableCSV(databaseName, fileOrDir, filepath.replace(\"/v3io/\", \"v3io://\", 1) + \"/\" + fileOrDir + \"/*\")\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Test how it works"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+\n",
      "|databaseName|\n",
      "+------------+\n",
      "|     default|\n",
      "+------------+\n",
      "\n",
      "+--------+---------------+-----------+\n",
      "|database|      tableName|isTemporary|\n",
      "+--------+---------------+-----------+\n",
      "| default|      csv_table|      false|\n",
      "| default|          demo1|      false|\n",
      "| default|          demo2|      false|\n",
      "| default|       example1|      false|\n",
      "| default|       example2|      false|\n",
      "| default|table_from_dir2|      false|\n",
      "+--------+---------------+-----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# test how the tables were saved\n",
    "#spark.sql(\"drop database test CASCADE\")\n",
    "spark.sql(\"show databases\").show()\n",
    "spark.sql(\"show tables in \" + databaseName).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[ISIN: string, Mnemonic: string, SecurityDesc: string, SecurityType: string, Currency: string, SecurityID: int, Date: timestamp, Time: string, StartPrice: double, MaxPrice: double, MinPrice: double, EndPrice: double, TradedVolume: int, NumberOfTrades: int]"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# test how saving to table works\n",
    "tableName = \"example1\"\n",
    "spark.sql(\"select * from \" + databaseName + \".\" + tableName)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[]"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark.sql(\"drop table \" + databaseName + \".example1\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
